package com.starzy.mr;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * @Author: starzy https://www.cnblogs.com/starzy
 * @Description:
 * @Date: Created in 10:27 2020/12/15
 */
public class DistinctWordMR {
    public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
        // 指定hdfs相关的参数
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://hadoop06:9000");
        System.setProperty("HADOOP_USER_NAME", "hadoop");

        Job job = Job.getInstance(conf);
        // 设置jar包所在路径
        job.setJarByClass(DistinctWordMR.class);

        // 指定mapper类和reducer类
        job.setMapperClass(DistinctWordMRMapper.class);
        job.setReducerClass(DistinctWordMRReducer.class);

        // 指定maptask的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        // 指定reducetask的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 指定该mapreduce程序数据的输入和输出路径
//		Path inputPath = new Path("d:/wordcount/input");
//		Path outputPath = new Path("d:/wordcount/output");
        Path inputPath = new Path("/wc/input");
        Path outputPath = new Path("/wc/output");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // 最后提交任务
        boolean waitForCompletion = job.waitForCompletion(true);
        System.exit(waitForCompletion ? 0 : 1);

    }

    /**
     * 描述：单词去重MR中的mapper组件。 读取文件然后切分出单词
     */
    private static class DistinctWordMRMapper extends Mapper<LongWritable, Text, Text, NullWritable> {

        private Text outkey = new Text();

        /**
         * 在单词计数的场景中。 把单词作为key输出即可， 不用输出value
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

            String[] split = value.toString().split(" ");
            for (String word : split) {
                outkey.set(word);
                context.write(outkey, NullWritable.get());
            }
        }
    }

    /**
     * 描述：单词去重的MR程序的reducer组件
     */
    private static class DistinctWordMRReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

            /**
             * reduce方法没调用一次，就接收到一组相同的单词。所以，在此因为是去重的业务，所以直接输出一次key即可。就表示这一组单词就取一个。就相当于实现去重的业务
             */
            context.write(key, NullWritable.get());
        }
    }

}
